package cn.mrcode.wxsdk.core.dialogue.common.accessToken.lifeCycle.distributed.strategy.master;

import cn.mrcode.wxsdk.core.context.ConfigContext;
import cn.mrcode.wxsdk.core.context.SdkContexts;
import cn.mrcode.wxsdk.core.dialogue.common.LogUtil;
import cn.mrcode.wxsdk.core.dialogue.common.PublicAccount;
import cn.mrcode.wxsdk.core.dialogue.common.accessToken.AccessTokenApi;
import cn.mrcode.wxsdk.core.dialogue.common.exception.ReqException;
import cn.mrcode.wxsdk.core.dialogue.common.exception.WxException;
import cn.mrcode.wxsdk.core.dialogue.common.log.LogTemplateUtil;
import cn.mrcode.wxsdk.core.dialogue.common.util.DateUtil;
import cn.mrcode.wxsdk.core.dialogue.protocol.base.AccessTokenInfo;
import cn.mrcode.wxsdk.core.zkhelper.curator.ClientHelper;
import cn.mrcode.wxsdk.core.zkhelper.curator.ZkPathHelper;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;

/**
 * 维护生命周期的分布式服务
 *
 * @author zhuqiang
 * @version V1.0
 * @date 2015/9/22 17:22
 */
public class AccessTokenDistributedTaskMaster{
    private static Logger log = LoggerFactory.getLogger(AccessTokenDistributedTaskMaster.class);
    private ConcurrentHashMap<String, AccessTokenInfo> accessTokenMap; // 本地缓存：引用更新服务对象里面的数据
    private HashMap<String, PublicAccount> publicAccountMap;
    private int sessionTimeout;
    private String zkServiceList;
    private String id = "";
    final String apiName = "分布式AccessTokenDistributedTaskMaster";
    public static final String masterPath = ConfigContext.masterPath_basetoken;
    public static final String runingPath = ConfigContext.runIngPath;

    private CuratorFramework client; //zk链接
    private ConcurrentHashMap<String, AccessTokenInfo> taskMap = new ConcurrentHashMap<>();  //用于任务中 更新的 token 列表.全部打成数组上传 zk上

    /**
     *
     * @param sessionTimeout 超时
     * @param zkServiceList
     * @param publicAccountMap
     * @param accessTokenMap
     */
    public AccessTokenDistributedTaskMaster(int sessionTimeout, String zkServiceList, HashMap<String, PublicAccount> publicAccountMap, ConcurrentHashMap<String, AccessTokenInfo> accessTokenMap) {
        this.sessionTimeout = sessionTimeout;
        this.zkServiceList = zkServiceList;
        this.publicAccountMap = publicAccountMap;
        this.accessTokenMap = accessTokenMap;
        this.id = SdkContexts.getConfigContext().getId();
    }

    /**
     * 任务初始化
     * 1. 创建zk链接
     * 2. 创建所需要的节点和初始化数据
     * 3. 拉取zk数据到本地
     * 4. 增加leader监听
     * 5. 增加维护父节点的变化，以便实时获取到已经更新的数据
     * **/
    public void init() throws Exception {
        client = ClientHelper.createClient(zkServiceList, sessionTimeout);
        log.info(LogTemplateUtil.svMsg(apiName, "%s 链接zookpeer服务器成功", id));

        ZkPathHelper.createIfExistsNot(client, masterPath, "".getBytes()); //创建token维护节点
        ZkPathHelper.createIfExistsNot(client, runingPath, "".getBytes());//创建leader运行信息目录

        refresh(new String(client.getData().forPath(masterPath)));// 先从zk上获取一次数据
        new LeaderListener(client, masterPath,id);
        final NodeCache nodeCache = new NodeCache(client, masterPath, false);
        nodeCache.start(true);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                refresh(new String(nodeCache.getCurrentData().getData()));
            }
        });
    }

    /**
     * 反序列化zk数据并更新到本地缓存
     * @param data zk数据
     */
    private void refresh(String data) {
        if (StringUtils.isBlank(data)) {
          return;
        }

        taskMap.clear();
        List<AccessTokenInfo> list = JSON.parseArray(data, AccessTokenInfo.class);
        for (AccessTokenInfo ati : list) {
            String appID = ati.getAppID();
            accessTokenMap.put(appID, ati);
            taskMap.put(appID, ati);
        }
        log.info(LogTemplateUtil.svMsg(apiName, "id=%s;将新数据已刷入本地缓存=%s", id, JSONObject.toJSONString(list)));
    }

    private class LeaderListener extends LifeCycleLeaderSelectorListener{
        private DelayQueue<AccessTokenInfo> queue = new DelayQueue<>(); // 队列中的数据 和 taskMap一致。也需要事先准备好给task

        public LeaderListener(CuratorFramework client, String leaderPath, String id) {
            super(client, leaderPath, id);
        }

        @Override
        protected void refreshData(String data) {
            refresh(data);
        }

        @Override
        protected void readyTaskData() throws Exception {
            client.setData().forPath(runingPath, id.getBytes());
            log.info(LogTemplateUtil.svMsg(apiName, "初始化task任务数据"));
            for (Map.Entry<String, PublicAccount> ent : publicAccountMap.entrySet()) {
                String appId = ent.getKey();
                PublicAccount pa = ent.getValue();
                if (!taskMap.containsKey(appId)) { //如果不包含则需要 获取token
                    AccessTokenInfo accessTokenInfo = getAccessTokenInfo(appId,pa.getAppSecret());
                    if(accessTokenInfo == null){
                        log.error("获取token出错，该公众号本次不纳入维护列表：" + JSON.toJSONString(ent));
                    }else{
                        taskMap.put(appId, accessTokenInfo);
                    }
                }
            }
            // 把获取的数据 上传一份先
            client.setData().forPath(masterPath, JSONObject.toJSONString(new ArrayList<>(taskMap.values())).getBytes());
        }

        @Override
        protected void run() {
            try {
                while (true) {
                    queue.clear();
                    for (Map.Entry<String, AccessTokenInfo> ent : taskMap.entrySet()) {
                        queue.add(ent.getValue());
                    }
                    log.info(LogTemplateUtil.svMsg(apiName, "更新任务列表数量预览：taskMap.size=%s;queue.size=%s", taskMap.size() + "", queue.size() + ""));
                    AccessTokenInfo ati = queue.take();  //获取超时的 对象
                    log.info(LogTemplateUtil.svMsg(apiName, "id=%s;获取到超时AccessTokenInfo，数据=%s；超时时间=%s",
                            id, JSONObject.toJSONString(ati), DateUtil.lFormat(ati.getTimeOut())));
                    String appID = ati.getAppID();
                    if (taskMap.containsKey(appID)) {
                        AccessTokenInfo tempAti = getAccessTokenInfo(appID,ati.getAppSecret());
                        if(tempAti == null){
                            log.error("获取token出错，该过期token本次忽略处理：" + JSON.toJSONString(ati));
                            taskMap.put(appID, ati);
                            queue.add(ati);
                            continue;
                        }
                        ati = tempAti;
                        taskMap.put(appID, ati);
                        queue.add(ati);
                        log.info(LogTemplateUtil.svMsg(apiName, "id=%s;appID=%s 已经更新；新的AccessTokenInfo数据=%s；超时时间=%s",
                                id, appID, JSONObject.toJSONString(ati), DateUtil.lFormat(ati.getTimeOut())));
                        //写入 zk上
                        client.setData().forPath(masterPath, JSONObject.toJSONString(new ArrayList<AccessTokenInfo>(taskMap.values())).getBytes());
                    } else {
                        log.error(LogTemplateUtil.svMsg(apiName, "%s,taskMap中不包含该appid(%s),忽略处理", id, appID));
                    }
                }
            } catch (InterruptedException e) {
                leaderClose();
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;任务被中断,本机器已经放弃了leader领导权，异常信息=%s", id, JSONObject.toJSONString(e)));
            } catch (Exception e) {
                leaderClose();
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;任务被中断,本机器已经放弃了leader领导权", id, JSONObject.toJSONString(e)));
            }
        }

        private AccessTokenInfo getAccessTokenInfo(String appID,String appSecret) {
            try {
                return AccessTokenApi.getAccessTokenInfo(appID, appSecret, id);
            } catch (ReqException e) {
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;appid=%s;" + LogUtil.fromateLog(e), id, appID));
            } catch (WxException e) {
                log.error(LogTemplateUtil.svMsg(apiName, "id=%s;appid=%s;" + LogUtil.fromateLog(e), id, appID));
            }
            return null;
        }
    }
}
